-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persist demoted and removed brokers #2115
base: main
Are you sure you want to change the base?
Persist demoted and removed brokers #2115
Conversation
73361d4
to
63f99c5
Compare
* Simplified KafkaZkClient creation for debugging. * Added persistence for demoted and removed brokers. * Added tests and conformed to lint tests.
f1de99b
to
5ae4d83
Compare
Hey @morgangalpin thanks for this! Do you mind sharing your testing approach/details aside from code tests? |
Sure thing. We've been using it in production for several months now. I've also tried it out on a local dockerized kafka cluster. My manual testing only involved restarting CC to ensure the previous removed/demoted broker state was restored. |
Got it, let me try reproduce this in my test environment and i'll get back to you… |
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/persisteddata/BackingMethod.java
Outdated
Show resolved
Hide resolved
@mhratson any progress with trying it out? |
@mhratson just following up, is there anything else needed before we can merge this change? |
@CCisGG or @viktorsomogyi, I haven't heard back from @mhratson . Are you able to review and merge this PR? |
CONFIG = CruiseControlRequestConfig.define(CruiseControlParametersConfig.define(AnomalyDetectorConfig.define( | ||
AnalyzerConfig.define(ExecutorConfig.define(MonitorConfig.define(WebServerConfig.define( | ||
UserTaskManagerConfig.define(new ConfigDef())))))))).withClientSslSupport().withClientSaslSupport(); | ||
ConfigDef configDef = new ConfigDef(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in this file look like a refactoring rather than part of the feature, can this be extracted in a separate PR?
maybeUpdateTopicConfig(adminClient, maintenanceEventTopic); | ||
maybeIncreasePartitionCount(adminClient, maintenanceEventTopic); | ||
} | ||
KafkaCruiseControlUtils.maybeCreateOrUpdateTopic(adminClient, maintenanceEventTopic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, an unrelated (though maybe tempting) refactoring…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is needed by the new code, but isn't available when written as a protected method. The protected method looks like it can be removed because it is only used in one place, but I left it since removing it could break any subclasses that override it. I could mark it as @deprecated
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@morgangalpin sorry for this, I missed your ping. I started reviewing it, will likely do multiple rounds on it in the following days.
config/cruisecontrol.properties
Outdated
|
||
# The configs to apply to the kafka topic used to persist Cruise Control data. Only applies if | ||
# "persisted.data.persist.method" is set to "kafka". This "list" should be a semicolon separated | ||
# string of 'key=value' pairs. The keys and values need to be valid Kafka Topic configs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is unnecessary to separate them with semicolon and prefixing should be enough. My problem with semicolon separation is that with many enough configs it makes the list a little bit hard to interpret.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I can switch the semicolons to commas and provide an example that demonstrates tidy formatting. In case I've misunderstood your request, the reason for the separator character is that I want to provide a mechanism for users to add any number of key=value pairs to the list rather than mandating which configs are permitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I can't switch the semicolons to commas because individual values can have commas. I can make sure the docs make it clear that readable formatting is easy and desirable.
private static final Class<StringDeserializer> DESERIALIZER_CLASS = StringDeserializer.class; | ||
|
||
// The hard-coded producer config. This is overridable. | ||
private static final Map<String, String> DEFAULT_PRODUCER_CONFIG = ImmutableMap.<String, String>builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All default configs should be part of cruisecontrol.properties
and we should avoid hard-coding (similarly to goals
and default.goals
). This would simplify your code as well as you wouldn't need to merge configs.
config.getShort(PersistedDataConfig.KAFKA_TOPIC_REPLICATION_FACTOR_CONFIG), | ||
config.getMap(PersistedDataConfig.KAFKA_TOPIC_ADDITIONAL_CONFIGS_MAP_CONFIG), | ||
config.getMap(PersistedDataConfig.KAFKA_PRODUCER_ADDITIONAL_CONFIGS_MAP_CONFIG), | ||
config.getMap(PersistedDataConfig.KAFKA_CONSUMER_ADDITIONAL_CONFIGS_MAP_CONFIG), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you only prefixed the configs, then you could get them the following way in an array and avoid duplicating functionality with getMap
:
config.originalsWithPrefix("persisted.data.kafka.topic."),
config.originalsWithPrefix("persisted.data.kafka.producer."),
config.originalsWithPrefix("persisted.data.kafka.consumer."),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, makes sense. Thanks for the examples.
* @param persistedMap The map to store {@link Executor} data in. | ||
*/ | ||
public ExecutorPersistedData(Map<String, String> persistedMap) { | ||
this._demotedOrRemovedBrokers = Namespace.EXECUTOR.embed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, you do this because this way we can serialize demoted or removed brokers with their timestamp the same way in both cases (in-memory vs Kafka).
To be completely honest, I find this a bit less intuitive to understand at first. For this I'd like to propose that what if we move the type of serialization to the respective storage method? For instance we could have a class like this:
BrokerEvent implements Serializable {
private String brokerId;
private long timestamp;
private EventType eventType; // demote or remove
private Namespace namespace; // currently "executor" only
}
For memory serialization we could use the standard ByteArrayInput/OutputStream serialization while for Kafka topics we could use json that is serialized into Strings. That would make the Kafka topics easily debuggable too.
When deserialized, we can represent them in a Map<String, BrokerEvent> structure to map broker ID to the event to make it easier to query.
Let me know your thoughts.
…attern. Removed the, now-unneeded, Map config value type.
This PR implements #2109.